﻿using Common.Library;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using Serialize.Library;
using System;
using System.Threading.Tasks;

namespace RabbitMQ.Library
{
    /// <summary>
    /// 表示消息到达客户端发起的事件。
    /// </summary>
    /// <param name="result">EventMessageResult 事件消息对象</param>
    public delegate void ActionEvent(MessageResult result);

    /// <summary>
    /// Rabbit扩展实现类
    /// </summary>
    public class RabbitExtension : IRabbitExtension
    {
        #region 实例
        /// <summary>
        /// RabbitMqClient 数据上下文。
        /// </summary>
        public RabbitContext Context { get; set; }

        /// <summary>
        /// 事件激活委托实例。
        /// </summary>
        private ActionEvent _actionMessage;

        /// <summary>
        /// 当侦听的队列中有消息到达时触发的执行事件。
        /// </summary>
        public event ActionEvent ActionEventMessage
        {
            add
            {
                if (_actionMessage.IsNull())
                    _actionMessage += value;
            }
            remove
            {
                if (_actionMessage.IsNotNull())
                    _actionMessage -= value;
            }
        }
        #endregion

        #region 渠道

        /// <summary>
        /// 生成渠道
        /// </summary>
        /// <param name="config"></param>
        public IModel CreateChannel()
        {
            IModel AssistChannel = null;
            try
            {
                Context.AssistConnection = RabbitContext.CreateConnection(); //获取连接

                using (Context.AssistConnection)
                {
                    Context.AssistChannel = RabbitContext.CreateModel(Context.AssistConnection); //获取通道

                    AssistChannel = Context.AssistChannel;
                }
            }
            catch (Exception exception)
            {
                Dispose();
                LogLocation.WriteException("RabbitMQClient", exception);
            }
            return AssistChannel;
        }
        #endregion

        #region 交换机

        /// <summary>
        /// 增加交换机
        /// </summary>
        /// <param name="config"></param>
        public bool CreateExchange(ExchangesInfo config)
        {
            bool result = false;
            try
            {
                Context.AssistConnection = RabbitContext.CreateConnection(); //获取连接

                using (Context.AssistConnection)
                {
                    Context.AssistChannel = RabbitContext.CreateModel(Context.AssistConnection); //获取通道

                    using (Context.AssistChannel)
                    {
                        if (!config.Exchange.IsNullOrEmpty()
                            && !config.Type.IsNullOrEmpty())
                        {
                            Context.AssistChannel.ExchangeDeclare(config.Exchange, config.Type, config.Durable, config.AutoDelete, config.Arguments);
                            result = true;
                        }
                    }
                }
            }
            catch (Exception exception)
            {
                Dispose();
                LogLocation.WriteException("RabbitMQClient", exception);
            }
            return result;
        }

        /// <summary>
        /// 删除交换机
        /// </summary>
        /// <param name="config"></param>
        public bool RemoveExchange(string exchangeName)
        {
            bool result = false;

            try
            {
                if (!exchangeName.IsNullOrEmpty())
                {
                    Context.AssistConnection = RabbitContext.CreateConnection(); //获取连接

                    using (Context.AssistConnection)
                    {
                        Context.AssistChannel = RabbitContext.CreateModel(Context.AssistConnection); //获取通道

                        using (Context.AssistChannel)
                        {
                            Context.AssistChannel.ExchangeDelete(exchangeName);
                            result = true;
                        }
                    }
                }
            }
            catch (Exception exception)
            {
                Dispose();
                LogLocation.WriteException("RabbitMQClient", exception);
            }
            return result;
        }

        /// <summary>
        /// 创建Exchange到Exchange的路由规则
        /// </summary>
        /// <param name="config"></param>
        public bool CreateExchangeToExchange(RoutingRuleInfo config)
        {
            bool result = false;

            try
            {
                if (!config.To.IsNullOrEmpty() && !config.From.IsNullOrEmpty() && !config.RoutingKey.IsNullOrEmpty())
                {
                    Context.AssistConnection = RabbitContext.CreateConnection(); //获取连接

                    using (Context.AssistConnection)
                    {
                        Context.AssistChannel = RabbitContext.CreateModel(Context.AssistConnection); //获取通道

                        using (Context.AssistChannel)
                        {
                            Context.AssistChannel.ExchangeBind(destination: config.To,
                                                                source: config.From,
                                                                routingKey: config.RoutingKey,
                                                                arguments: config.Arguments);
                            result = true;
                        }
                    }
                }
            }
            catch (Exception exception)
            {
                Dispose();
               LogLocation.WriteException("RabbitMQClient", exception);
            }

            return result;
        }
        #endregion

        #region 队列
        /// <summary>
        /// 增加队列信息
        /// </summary>
        /// <param name="config"></param>
        public QueueDeclareOk CreateQueue(QueuesInfo config)
        {
            QueueDeclareOk queueDeclareOk = null;

            try
            {
                if (!config.Queue.IsNullOrEmpty())
                {
                    Context.AssistConnection = RabbitContext.CreateConnection(); //获取连接

                    using (Context.AssistConnection)
                    {
                        Context.AssistChannel = RabbitContext.CreateModel(Context.AssistConnection); //获取通道

                        using (Context.AssistChannel)
                        {
                            //直接传入参数，没法执行成功；使用 "":"" 方式可以成功。
                            queueDeclareOk = Context.AssistChannel.QueueDeclare(queue: config.Queue,
                                                durable: config.Durable,
                                                exclusive: config.Exclusive,
                                                autoDelete: config.AutoDelete,
                                                arguments: config.Arguments);

                        }
                    }
                }
            }
            catch (Exception exception)
            {
                Dispose();
                LogLocation.WriteException("RabbitMQClient", exception);
            }

            return queueDeclareOk;
        }

        /// <summary>
        /// 删除队列
        /// </summary>
        /// <param name="config"></param>
        public bool RemoveQueue(string queueName)
        {
            bool result = false;
            try
            {
                if (!queueName.IsNullOrEmpty())
                {
                    Context.AssistConnection = RabbitContext.CreateConnection(); //获取连接

                    using (Context.AssistConnection)
                    {
                        Context.AssistChannel = RabbitContext.CreateModel(Context.AssistConnection); //获取通道

                        using (Context.AssistChannel)
                        {
                            Context.AssistChannel.QueueDelete(queueName);
                            result = true;
                        }
                    }
                }
            }
            catch (Exception exception)
            {
                Dispose();
                LogLocation.WriteException("RabbitMQClient", exception);
            }
            return result;
        }

        /// <summary>
        /// 清空队列
        /// </summary>
        /// <param name="config"></param>
        public uint PurgeQueue(string queueName)
        {
            uint purgeCount = 0;

            try
            {
                if (!queueName.IsNullOrEmpty())
                {
                    Context.AssistConnection = RabbitContext.CreateConnection(); //获取连接

                    using (Context.AssistConnection)
                    {
                        Context.AssistChannel = RabbitContext.CreateModel(Context.AssistConnection); //获取通道

                        using (Context.AssistChannel)
                        {
                            purgeCount = Context.AssistChannel.QueuePurge(queueName);
                        }
                    }
                }

            }
            catch (Exception exception)
            {

                Dispose();
                LogLocation.WriteException("RabbitMQClient", exception);
            }
            return purgeCount;
        }
        #endregion

        #region Exchange到Queue路由规则
        /// <summary>
        /// 创建Exchange到Queue的路由规则
        /// </summary>
        /// <param name="config"></param>
        public bool CreateExchangeToQueue(RoutingRuleInfo config)
        {
            bool result = false;
            try
            {
                if (!config.To.IsNullOrEmpty() && !config.From.IsNullOrEmpty() && !config.RoutingKey.IsNullOrEmpty())
                {
                    Context.AssistConnection = RabbitContext.CreateConnection(); //获取连接

                    using (Context.AssistConnection)
                    {
                        Context.AssistChannel = RabbitContext.CreateModel(Context.AssistConnection); //获取通道

                        using (Context.AssistChannel)
                        {
                            Context.AssistChannel.QueueBind(queue: config.To, exchange: config.From, routingKey: config.RoutingKey, config.Arguments);
                            result = true;
                        }
                    }
                }
            }
            catch (Exception exception)
            {
                Dispose();
                LogLocation.WriteException("RabbitMQClient", exception);
            }
            return result;
        }
        #endregion

        #region 消息发送

        /// <summary>
        /// 触发一个事件且将事件打包成消息发送到远程队列中。
        /// </summary>
        /// <param name="publishMessage">发送的消息实例。</param>
        /// <param name="exchange">RabbitMq的Exchange名称。</param>
        /// <param name="routingKey">routingKey</param>
        /// <param name="deliveryMode">持久化状态 Non-persistent (1) or persistent (2)</param>
        public bool SendEventMessage(MessagePublish publishMessage, string exchange, string routingKey, byte deliveryMode = 2)
        {
            bool result = false;
            try
            {
                if (!exchange.IsNullOrEmpty() && !routingKey.IsNullOrEmpty())
                {
                    Context.SendConnection = RabbitContext.CreateConnection(); //获取连接

                    using (Context.SendConnection)
                    {
                        Context.SendChannel = RabbitContext.CreateModel(Context.SendConnection); //获取通道

                        using (Context.SendChannel)
                        {
                            var properties = Context.SendChannel.CreateBasicProperties();
                            properties.DeliveryMode = deliveryMode;//表示持久化消息  Non-persistent (1) or persistent (2).
                                                                   //推送消息
                            Context.SendChannel.BasicPublish(exchange, routingKey, properties, MessageSerializer.SerializerToBytes(publishMessage, publishMessage.Flag));
                            result = true;
                        }
                    }
                }
            }
            catch (Exception exception)
            {
                Dispose();
                LogLocation.WriteException("RabbitMQClient", exception);
            }
            return result;
        }

        #endregion

        #region 接受消息

        /// <summary>
        /// 开始侦听默认的队列
        /// </summary>
        public void OnListening(string listenQueueName)
        {
            Task.Run(() =>
            {
                Context.ListenConnection = RabbitContext.CreateConnection(); //获取连接

                Context.ListenConnection.ConnectionShutdown += (o, e) =>
                {
                   LogLocation.WriteInfo("RabbitMQClient", "connection shutdown:" + e.ReplyText);
                };

                Context.ListenChannel = RabbitContext.CreateModel(Context.ListenConnection); //获取通道

                var consumer = new EventingBasicConsumer(Context.ListenChannel); //创建事件驱动的消费者类型
                consumer.Received += consumer_Received;

                Context.ListenChannel.BasicQos(0, 1, false); //一次只获取一个消息进行消费

                Context.ListenChannel.BasicConsume(listenQueueName.IsNullOrEmpty() ? Context.ListenQueueName : listenQueueName, false, consumer);
            });
        }

        /// <summary>
        /// 接受到消息
        /// </summary>
        private void consumer_Received(object sender, BasicDeliverEventArgs e)
        {
            try
            {
                var result = MessageManager.BuildMessageResult(e.Body); //获取消息返回对象

                if (result.Data.IsNotNull() && _actionMessage.IsNotNull())
                    _actionMessage(result); //触发外部侦听事件

                if (!result.Handled)
                {
                    //未能消费此消息，重新放入队列头
                    string msgStr = System.Text.Encoding.UTF8.GetString(e.Body);
                    sendDeadLetter(msgStr);
                    //Context.ListenChannel.BasicReject(e.DeliveryTag, true);
                    //死信交换机:用于处理消费者,消费失败回退的消息,根据死信交换机的routingkey发送到死信队列,交换机类型 topic EXAMPLE:
                }

                if (!Context.ListenChannel.IsClosed)
                {
                    //确认一个或多个已发送的消息。
                    Context.ListenChannel.BasicAck(e.DeliveryTag, false);
                }
            }
            catch (Exception exception)
            {
                Dispose();
                LogLocation.WriteException("RabbitMQClient", exception);
            }
        }

        /// <summary>
        /// 将发送到死信队列
        /// </summary>
        private void sendDeadLetter(string deadLetter)
        {
            var routingRule = ConfigManager.config.RoutingRules.Find(p => p.Key == "3234510b623b4dbaa2f7a978b685f62d");
            if (routingRule.IsNotNull())
            {
                Context.SendConnection = RabbitContext.CreateConnection(); //获取连接

                using (Context.SendConnection)
                {
                    Context.SendChannel = RabbitContext.CreateModel(Context.SendConnection); //获取通道

                    using (Context.SendChannel)
                    {
                        var properties = Context.SendChannel.CreateBasicProperties();
                        properties.DeliveryMode = 2;//表示持久化消息  Non-persistent (1) or persistent (2).
                                                    //推送消息
                        Context.SendChannel.BasicPublish(routingRule.From, routingRule.RoutingKey, properties, JsonHelper.ToJsonBytes(deadLetter));

                        LogLocation.WriteInfo("RabbitMQClient", "发送死信:" + deadLetter);
                    }
                }
            }
        }

        #endregion

        #region IDispose

        /// <summary>
        /// Dispose.
        /// </summary>
        public void Dispose()
        {
            if (Context.SendConnection.IsNull()) return;

            if (Context.SendConnection.IsOpen)
                Context.SendConnection.Close();

            Context.SendConnection.Dispose();
        }

        #endregion
    }
}
